{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "650462ec",
   "metadata": {},
   "source": [
    "# Using FastAPI to Run FastKafka Application"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "21a8701d",
   "metadata": {},
   "source": [
    "When deploying a FastKafka application, the default approach is to utilize the [`fastkafka run`](/docs/cli/fastkafka#fastkafka-run) CLI command. This command allows you to launch your FastKafka application as a standalone service. However, if you already have a FastAPI application in place and wish to run FastKafka application alongside it, you have an alternative option.\n",
    "\n",
    "FastKafka provides a method called `FastKafka.fastapi_lifespan` that leverages [FastAPI's lifespan](https://fastapi.tiangolo.com/advanced/events/#lifespan-events) feature. This method allows you to run your FastKafka application together with your existing FastAPI app, seamlessly integrating their functionalities. By using the `FastKafka.fastapi_lifespan` method, you can start the FastKafka application within the same process as the FastAPI app.\n",
    "\n",
    "The `FastKafka.fastapi_lifespan` method ensures that both FastAPI and FastKafka are initialized and start working simultaneously. This approach enables the execution of Kafka-related tasks, such as producing and consuming messages, while also handling HTTP requests through FastAPI's routes.\n",
    "\n",
    "By combining FastAPI and FastKafka in this manner, you can build a comprehensive application that harnesses the power of both frameworks. Whether you require real-time messaging capabilities or traditional HTTP endpoints, this approach allows you to leverage the strengths of FastAPI and FastKafka within a single deployment setup."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "89f5a924",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "\n",
    "1. A basic knowledge of `FastKafka` is needed to proceed with this guide. If you are not familiar with `FastKafka`, please go through the [tutorial](/docs#tutorial) first.\n",
    "2. `FastKafka` and `FastAPI` libraries needs to be installed."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7af8a037",
   "metadata": {},
   "source": [
    "This guide will provide a step-by-step explanation, taking you through each stage individually, before combining all the components in the final section for a comprehensive understanding of the process."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "40ecc6d5",
   "metadata": {},
   "source": [
    "## 1. Basic FastKafka app\n",
    "\n",
    "In this step, we will begin by creating a simple FastKafka application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abd847c7",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pydantic import BaseModel, Field, NonNegativeFloat\n",
    "from typing import *\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"url\": \"localhost\",\n",
    "        \"description\": \"local development kafka broker\",\n",
    "        \"port\": 9092,\n",
    "    },\n",
    "    \"production\": {\n",
    "        \"url\": \"kafka.airt.ai\",\n",
    "        \"description\": \"production kafka broker\",\n",
    "        \"port\": 9092,\n",
    "        \"protocol\": \"kafka-secure\",\n",
    "        \"security\": {\"type\": \"plain\"},\n",
    "    },\n",
    "}\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    title=\"Greetings\",\n",
    "    kafka_brokers=kafka_brokers,\n",
    ")\n",
    "\n",
    "\n",
    "class TestMsg(BaseModel):\n",
    "    msg: str = Field(...)\n",
    "\n",
    "\n",
    "@kafka_app.consumes()\n",
    "async def on_names(msg: TestMsg):\n",
    "    await to_greetings(TestMsg(msg=f\"Hello {msg.msg}\"))\n",
    "\n",
    "\n",
    "@kafka_app.produces()\n",
    "async def to_greetings(greeting: TestMsg) -> TestMsg:\n",
    "    return greeting"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f7328cb1",
   "metadata": {},
   "source": [
    "In the above example, we consume messages from a topic called `names`, we prepend \"Hello\" to the message, and send it back to another topic called `greetings`.\n",
    "\n",
    "We now have a simple `FastKafka` app to produce and consume from two topics."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1185a1df",
   "metadata": {},
   "source": [
    "## 2. Using fastapi_lifespan method\n",
    "\n",
    "In this step of the guide, we will explore the integration of a FastKafka application with a FastAPI application using the `FastKafka.fastapi_lifespan` method. \n",
    "The `FastKafka.fastapi_lifespan` method is a feature provided by FastKafka, which allows you to seamlessly integrate a FastKafka application with a FastAPI application by leveraging FastAPI's lifespan feature."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1e9cfed2",
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi import FastAPI\n",
    "\n",
    "fastapi_app = FastAPI(lifespan=kafka_app.fastapi_lifespan(kafka_broker_name=\"localhost\"))\n",
    "\n",
    "\n",
    "@fastapi_app.get(\"/hello\")\n",
    "async def hello():\n",
    "    return {\"msg\": \"hello there\"}"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "144e4767",
   "metadata": {},
   "source": [
    "In the above example, a new instance of the `FastAPI` app is created, and when the app is started using uvicorn, it also runs the `FastKafka` application concurrently."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9a9a0851",
   "metadata": {},
   "source": [
    "## Putting it all together\n",
    "\n",
    "Let's put the above code together and write it in a file called `fast_apps.py`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "856a25e9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "\n",
       "```python\n",
       "# content of the \"fast_apps.py\" file\n",
       "\n",
       "from pydantic import BaseModel, Field, NonNegativeFloat\n",
       "from typing import *\n",
       "\n",
       "from fastkafka import FastKafka\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"localhost\": {\n",
       "        \"url\": \"localhost\",\n",
       "        \"description\": \"local development kafka broker\",\n",
       "        \"port\": 9092,\n",
       "    },\n",
       "    \"production\": {\n",
       "        \"url\": \"kafka.airt.ai\",\n",
       "        \"description\": \"production kafka broker\",\n",
       "        \"port\": 9092,\n",
       "        \"protocol\": \"kafka-secure\",\n",
       "        \"security\": {\"type\": \"plain\"},\n",
       "    },\n",
       "}\n",
       "\n",
       "kafka_app = FastKafka(\n",
       "    title=\"Greetings\",\n",
       "    kafka_brokers=kafka_brokers,\n",
       ")\n",
       "\n",
       "\n",
       "class TestMsg(BaseModel):\n",
       "    msg: str = Field(...)\n",
       "\n",
       "\n",
       "@kafka_app.consumes()\n",
       "async def on_names(msg: TestMsg):\n",
       "    await to_greetings(TestMsg(msg=f\"Hello {msg.msg}\"))\n",
       "\n",
       "\n",
       "@kafka_app.produces()\n",
       "async def to_greetings(greeting: TestMsg) -> TestMsg:\n",
       "    return greeting\n",
       "\n",
       "\n",
       "from fastapi import FastAPI\n",
       "\n",
       "fastapi_app = FastAPI(lifespan=kafka_app.fastapi_lifespan(\"localhost\"))\n",
       "\n",
       "@fastapi_app.get(\"/hello\")\n",
       "async def hello():\n",
       "    return {\"msg\": \"hello there\"}\n",
       "\n",
       "```\n"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "from IPython.display import Markdown\n",
    "\n",
    "kafka_app_source = \"\"\"\n",
    "from pydantic import BaseModel, Field, NonNegativeFloat\n",
    "from typing import *\n",
    "\n",
    "from fastkafka import FastKafka\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"localhost\": {\n",
    "        \"url\": \"localhost\",\n",
    "        \"description\": \"local development kafka broker\",\n",
    "        \"port\": 9092,\n",
    "    },\n",
    "    \"production\": {\n",
    "        \"url\": \"kafka.airt.ai\",\n",
    "        \"description\": \"production kafka broker\",\n",
    "        \"port\": 9092,\n",
    "        \"protocol\": \"kafka-secure\",\n",
    "        \"security\": {\"type\": \"plain\"},\n",
    "    },\n",
    "}\n",
    "\n",
    "kafka_app = FastKafka(\n",
    "    title=\"Greetings\",\n",
    "    kafka_brokers=kafka_brokers,\n",
    ")\n",
    "\n",
    "\n",
    "class TestMsg(BaseModel):\n",
    "    msg: str = Field(...)\n",
    "\n",
    "\n",
    "@kafka_app.consumes()\n",
    "async def on_names(msg: TestMsg):\n",
    "    await to_greetings(TestMsg(msg=f\"Hello {msg.msg}\"))\n",
    "\n",
    "\n",
    "@kafka_app.produces()\n",
    "async def to_greetings(greeting: TestMsg) -> TestMsg:\n",
    "    return greeting\n",
    "\n",
    "\n",
    "from fastapi import FastAPI\n",
    "\n",
    "fastapi_app = FastAPI(lifespan=kafka_app.fastapi_lifespan(kafka_broker_name=\"localhost\"))\n",
    "\n",
    "@fastapi_app.get(\"/hello\")\n",
    "async def hello():\n",
    "    return {\"msg\": \"hello there\"}\n",
    "\"\"\"\n",
    "\n",
    "with open(\"fast_apps.py\", \"w\") as source:\n",
    "    source.write(kafka_app_source)\n",
    "\n",
    "Markdown(\n",
    "    f\"\"\"\n",
    "```python\n",
    "# content of the \"fast_apps.py\" file\n",
    "{kafka_app_source}\n",
    "```\n",
    "\"\"\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cc797d12",
   "metadata": {},
   "source": [
    "Finally, you can run the FastAPI application using a web server of your choice, such as Uvicorn or Hypercorn by running the below command:\n",
    "```cmd\n",
    "uvicorn fast_apps:fastapi_app --host=0.0.0.0 --port=8080\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "289aff74",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-05-19 12:44:13.150 [INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "23-05-19 12:44:13.151 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-05-19 12:44:13.151 [INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "23-05-19 12:44:13.152 [INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "23-05-19 12:44:13.153 [INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "23-05-19 12:44:13.767 [INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "23-05-19 12:44:15.435 [INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "23-05-19 12:44:16.894 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "23-05-19 12:44:16.908 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "23-05-19 12:44:16.909 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
      "23-05-19 12:44:16.909 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'group_id': 'app_for_tester_group', 'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
      "23-05-19 12:44:16.914 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
      "23-05-19 12:44:16.915 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
      "23-05-19 12:44:16.915 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
      "23-05-19 12:44:16.916 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'names'})\n",
      "23-05-19 12:44:16.916 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'names'}\n",
      "23-05-19 12:44:16.916 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
      "23-05-19 12:44:16.921 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
      "23-05-19 12:44:16.921 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'greetings'})\n",
      "23-05-19 12:44:16.922 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'greetings'}\n",
      "23-05-19 12:44:16.922 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
      "23-05-19 12:44:16.923 [INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 0 for group app_for_tester_group\n",
      "23-05-19 12:44:16.923 [INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group app_for_tester_group\n",
      "23-05-19 12:44:16.923 [INFO] aiokafka.consumer.group_coordinator: (Re-)joining group app_for_tester_group\n",
      "23-05-19 12:44:16.926 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'greetings': 1}. \n",
      "23-05-19 12:44:16.941 [INFO] aiokafka.consumer.group_coordinator: Joined group 'app_for_tester_group' (generation 1) with member_id aiokafka-0.8.0-f6d0234a-2fdd-420c-9770-f521138d7ba4\n",
      "23-05-19 12:44:16.942 [INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin\n",
      "23-05-19 12:44:16.975 [INFO] aiokafka.consumer.group_coordinator: Successfully synced group app_for_tester_group with generation 1\n",
      "23-05-19 12:44:16.976 [INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='names', partition=0)} for group app_for_tester_group\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:     Started server process [574015]\n",
      "INFO:     Waiting for application startup.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-05-19 12:44:19.946 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'\n",
      "23-05-19 12:44:19.947 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n",
      "23-05-19 12:44:19.953 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
      "23-05-19 12:44:19.954 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:     Application startup complete.\n",
      "INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-05-19 12:44:19.961 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
      "23-05-19 12:44:19.961 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'names'})\n",
      "23-05-19 12:44:19.962 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'names'}\n",
      "23-05-19 12:44:19.963 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
      "23-05-19 12:44:19.966 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'names': 1}. \n",
      "INFO:     127.0.0.1:56272 - \"GET /hello HTTP/1.1\" 200 OK\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:     Shutting down\n",
      "INFO:     Waiting for application shutdown.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-05-19 12:44:26.148 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
      "23-05-19 12:44:26.149 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:     Application shutdown complete.\n",
      "INFO:     Finished server process [574015]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23-05-19 12:44:26.157 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
      "23-05-19 12:44:26.157 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
      "23-05-19 12:44:26.266 [INFO] aiokafka.consumer.group_coordinator: LeaveGroup request succeeded\n",
      "23-05-19 12:44:26.268 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
      "23-05-19 12:44:26.268 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
      "23-05-19 12:44:26.269 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 572846...\n",
      "23-05-19 12:44:27.853 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 572846 terminated.\n",
      "23-05-19 12:44:27.854 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 572474...\n",
      "23-05-19 12:44:29.183 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 572474 terminated.\n",
      "ok\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "from fastkafka.testing import Tester\n",
    "from fastkafka._server import run_in_process\n",
    "import uvicorn\n",
    "from fastkafka import FastKafka\n",
    "import asyncio\n",
    "import requests\n",
    "\n",
    "\n",
    "def run_uvicorn():\n",
    "    uvicorn.run(\n",
    "        \"fast_apps:fastapi_app\",\n",
    "        host=\"0.0.0.0\",\n",
    "        port=8080,\n",
    "        reload=False,\n",
    "        log_level=\"debug\",\n",
    "        workers=1,\n",
    "    )\n",
    "\n",
    "\n",
    "app_for_tester = FastKafka(\n",
    "    kafka_brokers=dict(localhost=dict(url=\"localhost\", port=9092)),\n",
    "    group_id=\"app_for_tester_group\",\n",
    ")\n",
    "\n",
    "\n",
    "class TestMsg(BaseModel):\n",
    "    msg: str = Field(...)\n",
    "\n",
    "\n",
    "@app_for_tester.consumes(topic=\"names\")\n",
    "async def on_app_for_tester_names(msg: TestMsg):\n",
    "    await to_app_for_tester_greetings(TestMsg(msg=f\"Hello {msg.msg}\"))\n",
    "\n",
    "\n",
    "@app_for_tester.produces(topic=\"greetings\")\n",
    "async def to_app_for_tester_greetings(greeting: TestMsg) -> TestMsg:\n",
    "    return greeting\n",
    "\n",
    "\n",
    "async with Tester(app_for_tester) as tester:\n",
    "    with run_in_process(run_uvicorn) as p:\n",
    "        await asyncio.sleep(3)\n",
    "        res = requests.get(\"http://127.0.0.1:8080/hello\")\n",
    "        assert res.ok\n",
    "\n",
    "        await tester.to_names(TestMsg(msg=f\"signal 10\"))\n",
    "        await asyncio.sleep(3)\n",
    "        assert (\n",
    "            tester.mocks.on_greetings.call_count == 2\n",
    "        ), tester.mocks.on_greetings.call_count\n",
    "\n",
    "    p.close()\n",
    "\n",
    "print(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a6d56fba",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
